19.2 收发

必须对channel收发双方(G)进行包装,因为它们要携带数据项,并存储相关状态。

runtime2.go

type g struct { param unsafe.Pointer // 传递唤醒参数 } type sudog struct { g *g elem unsafe.Pointer // 数据存储空间指针 }

另外,channel还得维护发送和接收者等待队列,以及异步缓冲槽环状队列索引位置。

chan.go

type hchan struct { qcount uint // 缓冲槽有效数据项数量 closed uint32 // 是否关闭 sendx uint // 缓冲槽发送位置索引 recvx uint // 缓冲槽接收位置索引 recvq waitq // 接收者等待队列 sendq waitq // 发送者等待队列 } type waitq struct { first *sudog last *sudog }

和以往一样,sudog也实现了二级缓存复用体系。

runtime2.go

type p struct { sudogcache []*sudog // 在 procresize new(p) 时指向 sudogbuf sudogbuf [128]*sudog }

type schedt struct { sudogcache *sudog }

proc.go

func acquireSudog() *sudog { pp := mp.p.ptr() // 如果本地缓存为空 if len(pp.sudogcache) == 0 { // 从全局缓存转移一批到本地 for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil { s := sched.sudogcache sched.sudogcache = s.next s.next = nil pp.sudogcache = append(pp.sudogcache, s) } // 如果失败,则新建 if len(pp.sudogcache) == 0 { pp.sudogcache = append(pp.sudogcache, new(sudog)) } } // 从尾部提取,并调整本地缓存 n := len(pp.sudogcache) s := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil pp.sudogcache = pp.sudogcache[:n-1] return s } func releaseSudog(s *sudog) { pp := mp.p.ptr() // 如果本地缓存已满 if len(pp.sudogcache) == cap(pp.sudogcache) { // 转移一半到全局 var first, last *sudog for len(pp.sudogcache) > cap(pp.sudogcache)/2 { n := len(pp.sudogcache) p := pp.sudogcache[n-1] pp.sudogcache[n-1] = nil pp.sudogcache = pp.sudogcache[:n-1] if first == nil { first = p } else { last.next = p } last = p }

            // 将提取的链表挂到全局
            last.next = sched.sudogcache
            sched.sudogcache = first
    }
    pp.sudogcache = append(pp.sudogcache, s)

}

sched.sudogcache缓存会在垃圾回收执行clearpools时被清理,但P本地缓存会被保留。

同步和异步收发算法有很大差异,但不知作者为什么非要将它们塞到一起。这些看起来有些巨大的函数,让人看着很不舒服。为便于分析,我们将其拆解开来。

同步

同步模式的关键是找到匹配的接收或发送方,找到则直接拷贝数据;找不到就将自身打包后放入等待队列,由另一方复制数据并唤醒。

在同步模式下,channel的作用仅是维护发送和接收者队列,数据复制与channel无关。另外在唤醒后,需要验证唤醒者身份,以此决定是否有实际的数据传递。

chan.go

func chansend1(t *chantype, c *hchan, elem unsafe.Pointer) { chansend(t, c, elem, true, getcallerpc(unsafe.Pointer(&t))) } // 参数 eq 是数据项指针 func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 同步模式 if c.dataqsiz == 0 { // 从等待队列获取接收者 sg := c.recvq.dequeue() if sg != nil { recvg := sg.g // 直接用 memmove 将数据项复制给接收者 if sg.elem != nil { syncsend(c, sg, ep) } // 唤醒检查标志,表明是由发送者唤醒 // closechan 一样会唤醒接收者,但 param = nil recvg.param = unsafe.Pointer(sg) // 唤醒接收者 goready(recvg, 3) return true } // 如果没有接收者,则打包成 sudog gp := getg() mysg := acquireSudog() // 新建,或从缓存获取复用 sudog 对象 mysg.elem = ep mysg.g = gp gp.param = nil // 将发送 sudog 放入等待队列,休眠,等待被接收者唤醒 c.sendq.enqueue(mysg) goparkunlock(&c.lock, “chan send”, traceEvGoBlockSend, 3) // 被唤醒,检查是否被 closechan 唤醒 // 此时数据已被接收者复制,无须再做处理 gp.waiting = nil if gp.param nil { if c.closed 0 { throw(“chansend: spurious wakeup”) } panic(“send on closed channel”) } gp.param = nil // 将 sudog 放回复用缓存 releaseSudog(mysg) return true } }

接收代码和发送的几乎一致,差别在于谁先进入等待队列,谁负责唤醒。编译器会将不同语法翻译成不同的函数调用。

chan.go

// chan func chanrecv1(t *chantype, c *hchan, elem unsafe.Pointer) { chanrecv(t, c, elem, true) } // x, ok := chan // for x := range chan func chanrecv2(t *chantype, c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(t, c, elem, true) return }

chan.go

func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // 同步模式 if c.dataqsiz == 0 { // 从等待队列获取发送者 sg := c.sendq.dequeue() if sg != nil { // 从发送者复制数据 if ep != nil { typedmemmove(c.elemtype, ep, sg.elem) } sg.elem = nil gp := sg.g

                    // 设置唤醒检查标志
                    gp.param = unsafe.Pointer(sg)
                    // 唤醒发送者,解除其阻塞
                    goready(gp, 3)
                    selected = true
                    received = true
                    return
            }
            // 如果没有发送者,打包成 sudog
            gp := getg()
            mysg := acquireSudog()
            mysg.elem = ep
            mysg.g = gp
            gp.param = nil
            // 放入等待队列,休眠,等待被发送者唤醒
            c.recvq.enqueue(mysg)
            goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
            // 被唤醒
            // 数据已被发送者复制过来
            gp.waiting = nil
            // 通过检查唤醒标志来决定是否有数据被复制
            haveData := gp.param != nil
            gp.param = nil
            // 将 sudog 放回复用缓存
            releaseSudog(mysg)
            if haveData {
                    selected = true
                    received = true
                    return
            }
            return recvclosed(c, ep)
    }

}

异步

异步模式围绕缓冲槽进行。当有空位时,发送者向槽中复制数据;有数据后,接收者从槽中获取数据。双方都有唤醒排队的另一方继续工作的责任。

chan.go

func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 异步模式 // 如果缓冲槽没有空位 for futile := byte(0); c.qcount >= c.dataqsiz; futile = traceFutileWakeup { // 打包成 sudog gp := getg() mysg := acquireSudog() mysg.g = gp mysg.elem = nil // 放入发送者等待队列,休眠。等待有空位时被唤醒 c.sendq.enqueue(mysg) goparkunlock(&c.lock, “chan send”, traceEvGoBlockSend|futile, 3) // 唤醒后,如果 qcount < dataqsiz 表示有空位,跳出循环 // 将 sudog 放回复用缓存 releaseSudog(mysg) } // 将数据复制到缓冲槽 typedmemmove(c.elemtype, chanbuf(c, c.sendx), ep) // 调整缓冲槽队列索引和数据项计数 c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ // 现在缓冲槽不为空,唤醒某个排队的接收者从槽中获取数据 sg := c.recvq.dequeue() if sg != nil { recvg := sg.g goready(recvg, 3) } return true }

发送须有缓冲槽空位,而接收则须槽中有可用数据项。

chan.go

func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // 异步模式 // 如果缓冲槽中没有数据项 for futile := byte(0); c.qcount 0; futile = traceFutileWakeup { // 打包成 sudog gp := getg() mysg := acquireSudog() mysg.elem = nil mysg.g = gp // 放入接收等待队列,休眠。等待有数据项时被唤醒 c.recvq.enqueue(mysg) goparkunlock(&c.lock, “chan receive”, traceEvGoBlockRecv|futile, 3) // 唤醒后,qcount > 0,跳出循环 // 将 sudog 返回复用缓存 releaseSudog(mysg) } // 从缓冲槽复制数据项 if ep != nil { typedmemmove(c.elemtype, ep, chanbuf(c, c.recvx)) } // 清零。调整缓冲槽队列索引及计数 memclr(chanbuf(c, c.recvx), uintptr(c.elemsize)) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount— // 现在有空位了,唤醒某个排队的发送者向槽中发送数据 sg := c.sendq.dequeue() if sg != nil { gp := sg.g goready(gp, 3) } selected = true received = true return }

关闭

关闭操作将所有排队者唤醒,并通过chan.closed、g.param参数告知由close发出。

  • 向closed channel发送数据,触发panic。
  • 从closed channel读取数据,返回零值。
  • 无论收发,nil channel都会阻塞。

chan.go

func closechan(c *hchan) { // 不能重复关闭 if c.closed != 0 { panic(“close of closed channel”) } // 设置关闭标志 c.closed = 1 // 释放所有接收者 for { sg := c.recvq.dequeue() if sg == nil { break } gp := sg.g sg.elem = nil // 这个参数表明唤醒者是 closechan gp.param = nil // 唤醒接收者 goready(gp, 3) } // 释放所有发送者 for { sg := c.sendq.dequeue() if sg == nil { break } gp := sg.g sg.elem = nil // closechan 唤醒 gp.param = nil goready(gp, 3) } }